/*
 *    Copyright 2022 The DSMS Authors.
 *
 *    Licensed under the Apache License, Version 2.0 (the "License");
 *    you may not use this file except in compliance with the License.
 *    You may obtain a copy of the License at
 *
 *        http://www.apache.org/licenses/LICENSE-2.0
 *
 *    Unless required by applicable law or agreed to in writing, software
 *    distributed under the License is distributed on an "AS IS" BASIS,
 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *    See the License for the specific language governing permissions and
 *    limitations under the License.
 */

package com.dsms.modules.node.service.impl;

import cn.hutool.core.net.NetUtil;
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.dsms.common.constant.*;
import com.dsms.common.exception.DsmsEngineException;
import com.dsms.common.model.PageParam;
import com.dsms.common.taskmanager.TaskContext;
import com.dsms.common.taskmanager.model.AsyncStep;
import com.dsms.common.taskmanager.model.AsyncTask;
import com.dsms.common.taskmanager.model.Step;
import com.dsms.common.taskmanager.model.Task;
import com.dsms.common.taskmanager.service.IStepService;
import com.dsms.common.taskmanager.service.ITaskService;
import com.dsms.common.util.FilterUtil;
import com.dsms.common.util.PageUtil;
import com.dsms.dfsbroker.cluster.service.IClusterService;
import com.dsms.dfsbroker.node.api.NodeApi;
import com.dsms.dfsbroker.node.model.Device;
import com.dsms.dfsbroker.node.model.Node;
import com.dsms.dfsbroker.node.model.remote.OrchDeviceLsResult;
import com.dsms.dfsbroker.node.model.remote.OsdDfResult;
import com.dsms.dfsbroker.node.model.remote.ResponseMon;
import com.dsms.dfsbroker.node.service.INodeService;
import com.dsms.dfsbroker.osd.osd.api.OsdApi;
import com.dsms.dfsbroker.osd.osd.model.remote.OSDMetadataResult;
import com.dsms.dfsbroker.osd.osd.model.remote.OSDResult;
import com.dsms.dfsbroker.storagepool.api.StoragePoolApi;
import com.dsms.modules.node.model.dto.NodeDeviceManageDto;
import com.dsms.modules.node.model.vo.NodePageVO;
import com.dsms.modules.util.RemoteCallUtil;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;

import java.net.InetSocketAddress;
import java.util.*;
import java.util.stream.Collectors;

@Service
@Slf4j
public class NodeServiceImpl implements INodeService {
    private static final int PG_NUM_ZERO = 0;
    private static final String OSD = ":osd.";

    @Autowired
    private NodeApi nodeApi;

    @Autowired
    private OsdApi osdApi;

    @Autowired
    private StoragePoolApi storagePoolApi;

    @Autowired
    private ITaskService taskService;

    @Autowired
    private IStepService stepService;

    @Autowired
    private IClusterService clusterService;

    @Autowired
    private TaskContext taskContext;


    @Override
    public List<Node> list() {
        List<Node> nodeList = new ArrayList<>();
        List<ResponseMon> monList = null;
        try {
            monList = nodeApi.getNodeList(RemoteCallUtil.generateRemoteRequest());
        } catch (Throwable e) {
            throw new DsmsEngineException(e, ResultCode.NODE_LISTNODE_ERROR);
        }
        if (ObjectUtils.isEmpty(monList)) {
            return nodeList;
        }
        List<OrchDeviceLsResult> orchDeviceLs = null;
        try {
            orchDeviceLs = nodeApi.getOrchDeviceLs(RemoteCallUtil.generateRemoteRequest(), null);
        } catch (Throwable e) {
            log.warn("get all devices from dsms-storage failed, fail reason:{}", e.getMessage(), e);
        }
        List<OSDMetadataResult> osdMetadatas = null;

        try {
            osdMetadatas = osdApi.getOsdMetadata(RemoteCallUtil.generateRemoteRequest());
        } catch (Throwable e) {
            log.warn("get cluster osd metadata from dsms-storage failed, fail reason:{}", e.getMessage(), e);
        }

        for (ResponseMon responseMon : monList) {
            Node node = new Node();
            String name = responseMon.getName();
            node.setNodeName(name);
            String addr = responseMon.getAddr();
            if (ObjectUtils.isEmpty(addr)) {
                continue;
            }
            //verify node server status
            String nodeIp = addr.substring(0, addr.indexOf(':'));
            String nodePort = addr.substring(addr.indexOf(':') + 1, addr.indexOf('/'));
            boolean connected = NetUtil.isOpen(new InetSocketAddress(nodeIp, Integer.parseInt(nodePort)), SystemConst.CONNECT_TIMEOUT);
            node.setNodeIp(nodeIp);
            node.setNodeStatus(connected ? NodeStatusEnum.UP.getStatus() : NodeStatusEnum.DOWN.getStatus());
            //get the number of available and all disks of the node
            nodeList.add(node);
            if (orchDeviceLs !=null && !orchDeviceLs.isEmpty()) {
                Optional<OrchDeviceLsResult> monOrchDevice = orchDeviceLs.stream().filter(orchDevice -> orchDevice.getAddr().equals(name)).findFirst();
                if (monOrchDevice.isEmpty()) {
                    continue;
                }
                List<Device> devices = Device.orchDeviceLsResultParseDevice(monOrchDevice.get());
                //filter non-local cluster osd
                List<Device> resultDevices = filterClusterOsd(osdMetadatas, name, devices);
                //count pool used osd
                long poolUsedDevice = resultDevices.stream().filter(device -> device.getUsedPool() != null).count();
                node.setPoolUsedDevice((int) poolUsedDevice);
                long count = resultDevices.stream().filter(device -> device.getOsdId() != null).count();
                node.setNodeUsedDevice((int) count);
                node.setNodeAllDevice(resultDevices.size());
                node.setDevices(resultDevices);
            }

        }

        return nodeList;
    }


    @Override
    public Page<Node> page(PageParam pageParam) {
        List<Node> list = list();
        if (pageParam instanceof NodePageVO) {
            NodePageVO nodePageVo = (NodePageVO) pageParam;
            list = list.stream()
                    .filter(node -> FilterUtil.matches(nodePageVo.getNodeName(), node.getNodeName()) && FilterUtil.matches(nodePageVo.getNodeStatus(), node.getNodeStatus()))
                    .sorted(Comparator.comparing(Node::getNodeName))
                    .collect(Collectors.toList());
        }
        return PageUtil.getPageData(list, pageParam.getPageNo(), pageParam.getPageSize());
    }


    @Override
    public Node getNodeInfo(String nodeName) {
        Node node = new Node();
        node.setNodeName(nodeName);
        Optional<OrchDeviceLsResult> monOrchDevice;
        try {
            List<OrchDeviceLsResult> orchDeviceLs = nodeApi.getOrchDeviceLs(RemoteCallUtil.generateRemoteRequest(), null);
            monOrchDevice = orchDeviceLs.stream().filter(orchDevice -> orchDevice.getAddr().equals(nodeName)).findFirst();
        } catch (Throwable e) {
            throw new DsmsEngineException(e, ResultCode.NODE_LISTNODE_ERROR);
        }
        if (monOrchDevice.isEmpty()) {
            return node;
        }

        List<OSDMetadataResult> osdMetadatas = null;

        try {
            osdMetadatas = osdApi.getOsdMetadata(RemoteCallUtil.generateRemoteRequest());
        } catch (Throwable e) {
            log.warn("get cluster osd metadata from dsms-storage failed, fail reason:{}", e.getMessage(), e);
        }

        List<Device> devices = Device.orchDeviceLsResultParseDevice(monOrchDevice.get());

        //filter out non-local cluster osd
        List<Device> resultDevices = filterClusterOsd(osdMetadatas, nodeName, devices);

        try {
            OsdDfResult osdDf = nodeApi.getOsdDf(RemoteCallUtil.generateRemoteRequest());
            List<Device> osdDfDevice = Device.osdDfResultParseDevice(osdDf);
            for (Device device : resultDevices) {
                for (Device deviceDf : osdDfDevice) {
                    if (deviceDf.getOsdId().equals(device.getOsdId())) {
                        device.setDevicePgs(deviceDf.getDevicePgs());
                        device.setDeviceUsedSize(deviceDf.getDeviceUsedSize());
                        device.setOsdStatus(deviceDf.getOsdStatus());
                    }
                }
            }
        } catch (Throwable e) {
            log.warn("get osd info from dsms-storage fail", e);
        }

        //count pool used osd
        long poolUsedDevice = resultDevices.stream().filter(device -> device.getUsedPool() != null).count();
        node.setPoolUsedDevice((int) poolUsedDevice);
        //count all osd
        long nodeUsedDevice = resultDevices.stream().filter(device -> device.getOsdId() != null).count();
        node.setNodeUsedDevice((int) nodeUsedDevice);
        node.setNodeAllDevice(resultDevices.size());
        node.setDevices(resultDevices);

        return node;
    }

    @Override
    public boolean addOsd(String nodeName, String devicePath) {

        if (!taskContext.validateTask(TaskTypeEnum.ADD_OSD, nodeName, devicePath)) {
            throw new DsmsEngineException(ResultCode.NODE_ADD_OSD_TASK_EXIST);
        }
        //1.verify nodeName exist and disk available
        if (!validateNode(nodeName)) {
            throw new DsmsEngineException(ResultCode.NODE_NOT_EXIST);
        }
        if (!validateDeviceAvailable(nodeName, devicePath)) {
            throw new DsmsEngineException(ResultCode.NODE_DEVICE_NOT_AVAILABLE);
        }

        String taskMessage = "节点管理磁盘，节点:" + nodeName + ", 磁盘路径:" + devicePath;
        Task task = new AsyncTask(
                TaskTypeEnum.ADD_OSD.getName(),
                TaskTypeEnum.ADD_OSD.getType(),
                TaskStatusEnum.QUEUE.getStatus(),
                taskMessage,
                JSONUtil.toJsonStr(JSONUtil.toJsonStr(new NodeDeviceManageDto(nodeName, devicePath))),
                TaskExclusiveEnum.EXCLUSIVE.getCode());

        if (taskService.save(task)) {
            return addOsdStepTask(task.getId(), nodeName, devicePath);
        }

        return false;
    }

    @Override
    public boolean removeOsd(Integer osdId) {

        if (!taskContext.validateTask(TaskTypeEnum.REMOVE_OSD, String.valueOf(osdId))) {
            throw new DsmsEngineException(ResultCode.NODE_REMOVE_OSD_TASK_EXIST);
        }
        //1. verify osd id exist
        if (!validateOsdId(osdId)) {
            throw new DsmsEngineException(ResultCode.NODE_OSD_NOT_EXIST);
        }

        if (validateOsdUsed(osdId)) {
            throw new DsmsEngineException(ResultCode.NODE_OSD_IS_USED);
        }

        String host;
        String path;
        try {
            List<OSDMetadataResult> osdMetadata = osdApi.getOsdMetadata(RemoteCallUtil.generateRemoteRequest());
            OSDMetadataResult rmOsdMetaData = osdMetadata.stream().filter(item -> Objects.equals(item.getId(), osdId)).findFirst().orElse(null);
            assert rmOsdMetaData != null;
            host = rmOsdMetaData.getHostname();
            path = rmOsdMetaData.getDevices();
        } catch (Throwable e) {
            throw new DsmsEngineException(e, ResultCode.NODE_REMOVEOSD_TASKERROR);
        }

        String taskMessage = "节点" + host + "移除磁盘，osdId:" + osdId + "，盘符:" + NodeDeviceManageDto.DEVICE_PATH_PREFIX + path;
        Task task = new AsyncTask(
                TaskTypeEnum.REMOVE_OSD.getName(),
                TaskTypeEnum.REMOVE_OSD.getType(),
                TaskStatusEnum.QUEUE.getStatus(),
                taskMessage,
                JSONUtil.toJsonStr(JSONUtil.toJsonStr(new NodeDeviceManageDto(osdId, host, path))));
        if (taskService.save(task)) {
            return removeOsdStepTask(task.getId(), osdId);
        }
        return false;
    }

    private boolean addOsdStepTask(int taskId, String host, String path) {
        Step createOsdStep = new AsyncStep(
                taskId,
                StepTypeEnum.CREATE_OSD.getName(),
                StepTypeEnum.CREATE_OSD.getType(),
                StepTypeEnum.CREATE_OSD.getName() + ",节点:" + host + "磁盘:" + path,
                TaskStatusEnum.QUEUE.getStatus());

        if (!stepService.save(createOsdStep)) {
            log.error("the task for create osd has been created failed");
            return false;
        }

        Step moveOsdStep = new AsyncStep(
                taskId,
                StepTypeEnum.MOVE_OSD_TO_DEFAULT.getName(),
                StepTypeEnum.MOVE_OSD_TO_DEFAULT.getType(),
                StepTypeEnum.MOVE_OSD_TO_DEFAULT.getName(),
                TaskStatusEnum.QUEUE.getStatus());

        if (!stepService.save(moveOsdStep)) {
            log.error("the task for purge osd has been created failed");
            return false;
        }

        return true;
    }

    private boolean removeOsdStepTask(int taskId, int osdId) {
        Step stopOsdStep = new AsyncStep(
                taskId,
                StepTypeEnum.STOP_OSD.getName(),
                StepTypeEnum.STOP_OSD.getType(),
                StepTypeEnum.STOP_OSD.getName() + OSD + osdId,
                TaskStatusEnum.QUEUE.getStatus());

        if (!stepService.save(stopOsdStep)) {
            log.error("the task for stop has been created failed");
            return false;
        }

        Step purgeOsdStep = new AsyncStep(
                taskId,
                StepTypeEnum.PURGE_OSD.getName(),
                StepTypeEnum.PURGE_OSD.getType(),
                StepTypeEnum.PURGE_OSD.getName() + OSD + osdId,
                TaskStatusEnum.QUEUE.getStatus());

        if (!stepService.save(purgeOsdStep)) {
            log.error("the task for purge osd has been created failed");
            return false;
        }

        Step zapOsdStep = new AsyncStep(
                taskId,
                StepTypeEnum.ZAP_OSD.getName(),
                StepTypeEnum.ZAP_OSD.getType(),
                StepTypeEnum.ZAP_OSD.getName() + OSD + osdId,
                TaskStatusEnum.QUEUE.getStatus());

        if (!stepService.save(zapOsdStep)) {
            log.error("the task for zap osd has been created failed");
            return false;
        }

        return true;
    }

    @Override
    public boolean validateNode(String nodeName) {
        List<ResponseMon> monList = null;
        boolean nodeExist = false;

        try {
            monList = nodeApi.getNodeList(RemoteCallUtil.generateRemoteRequest());
        } catch (Throwable e) {
            log.warn("get node list from dsms-storage fail,fail message:{}", e.getMessage(), e);
            return false;
        }
        if (ObjectUtils.isEmpty(monList)) {
            return false;
        }
        for (ResponseMon mon : monList) {
            if (mon.getName().equals(nodeName)) {
                nodeExist = true;
                break;
            }
        }

        return nodeExist;
    }


    @Override
    public boolean validateDeviceAvailable(String nodeName, String devicePath) {
        Optional<OrchDeviceLsResult> monOrchDevice;
        boolean deviceAvailable = false;

        try {
            List<OrchDeviceLsResult> orchDeviceLs = nodeApi.getOrchDeviceLs(RemoteCallUtil.generateRemoteRequest(), null);
            monOrchDevice = orchDeviceLs.stream().filter(orchDevice -> orchDevice.getAddr().equals(nodeName)).findFirst();
        } catch (Throwable e) {
            log.warn("get device status from dsms-storage fail,fail message:{}", e.getMessage(), e);
            return false;
        }
        if (monOrchDevice.isEmpty()) {
            return false;
        }
        List<Device> devices = Device.orchDeviceLsResultParseDevice(monOrchDevice.get());
        for (Device device : devices) {
            if (device.getDevicePath().equals(devicePath) && DeviceStatusEnum.AVAILABLE.getStatus()
                    .equals(device.getDeviceStatus())) {
                deviceAvailable = true;
                break;
            }
        }

        return deviceAvailable;
    }

    @Override
    public boolean validateOsdId(Integer osdId) {
        boolean osdExist = false;
        try {
            OsdDfResult osdDf = nodeApi.getOsdDf(RemoteCallUtil.generateRemoteRequest());
            List<Device> osdDfDevice = Device.osdDfResultParseDevice(osdDf);
            for (Device deviceDf : osdDfDevice) {
                if (deviceDf.getOsdId().equals(osdId)) {
                    osdExist = true;
                    break;
                }
            }
        } catch (Throwable e) {
            log.warn("validate osd id from dsms-storage fail", e);
            return false;
        }

        return osdExist;
    }

    private boolean validateOsdUsed(Integer osdId) {
        boolean osdUsed = true;
        try {
            OSDResult osd = osdApi.getOsdById(RemoteCallUtil.generateRemoteRequest(), osdId);
            if (!ObjectUtils.isEmpty(osd.getCrushLocation().getRoot()) && Objects.equals(osd.getCrushLocation().getRoot(), CrushmapConst.DEFAULT_ROOT_BUCKET)) {
                    osdUsed = false;
                }

            OsdDfResult osdDfResult = nodeApi.getOsdDf(RemoteCallUtil.generateRemoteRequest());
            List<Device> osdDfDevice = Device.osdDfResultParseDevice(osdDfResult);
            Optional<Device> first = osdDfDevice.stream().filter(osdDevice -> osdDevice.getOsdId().equals(osdId)).findFirst();
            if (first.isPresent() && first.get().getDevicePgs() > PG_NUM_ZERO) {
                osdUsed = true;
            }
        } catch (Throwable e) {
            log.warn("validate osd used from dsms-storage failed, fail reason:{}", e.getMessage(), e);
        }
        return osdUsed;
    }

    @NotNull
    private List<Device> filterClusterOsd(List<OSDMetadataResult> osdMetadatas, String name, List<Device> devices) {
        String clusterFsid = clusterService.getCurrentBindCluster().getFsid();
        List<Device> resultDevices = new ArrayList<>();
        for (Device device : devices) {
            if (DeviceStatusEnum.AVAILABLE.getStatus().equals(device.getDeviceStatus())) {
                resultDevices.add(device);
            } else if (DeviceStatusEnum.NOT_AVAILABLE.getStatus().equals(device.getDeviceStatus())) {
                //if device not available.
                //if device not this cluster
                if (ObjectUtils.isEmpty(device.getClusterFsid()) || !Objects.equals(device.getClusterFsid(), clusterFsid)) {
                    continue;
                }
                //if this cluster have no osd
                if (ObjectUtils.isEmpty(osdMetadatas)) {
                    continue;
                }
                //if device have osd id , but device is not this cluster osd.
                try {
                    for (OSDMetadataResult osdMetadata : osdMetadatas) {
                        if (osdMetadata.getHostname().equals(name)
                                && osdMetadata.getId().equals(device.getOsdId())
                                && device.getDevicePath().contains(osdMetadata.getDevices())) {
                            OSDResult osd = osdApi.getOsdById(RemoteCallUtil.generateRemoteRequest(), device.getOsdId());
                            if (!ObjectUtils.isEmpty(osd.getCrushLocation().getRoot()) && !Objects.equals(osd.getCrushLocation().getRoot(), CrushmapConst.DEFAULT_ROOT_BUCKET)) {
                                    device.setUsedPool(osd.getCrushLocation().getRoot());
                                }

                            resultDevices.add(device);
                        }
                    }
                } catch (Throwable e) {
                    log.warn("get cluster osd info failed, fail reason:{}", e.getMessage(), e);
                }
            }
        }
        return resultDevices;
    }
}
